Appearance
使用 手动 ACK + 阻塞式重试 的方式实现 Kafka 原生重试
ACK 模式配置
声明配置类后,在配置类声明相关属性配置
代码示例(某个项目中), 禁用自动提交,配合手动 ACK 使用
`stringObjectMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);- 使用 手动 ACK 模式
- 必须显式调用 ack.acknowledge() 才能提交 offset
- 如果不调用 ack:消息 offset 不会提交,下次重启服务会重新消费该消息
ack 跟服务有关系,跟重试关系不大
阻塞式重试配置
@Value(value = "${kafka.backoff.interval:1000}")
private Long interval; // 重试间隔,默认 1000ms(1秒)
@Value(value = "${kafka.backoff.max_failure:3}")
private Long maxAttempts; // 最大重试次数,默认 3 次
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(..., fixedBackOff);
// 可重试的异常
errorHandler.addRetryableExceptions(SocketTimeoutException.class, ...);
// 不可重试的异常
errorHandler.addNotRetryableExceptions(NullPointerException.class, ...);
return errorHandler;
}注意一下,这里是 addRetryableExceptions,不是 set;
如果这里是没有明确声明的不可重试异常,他的默认机制也是会走重试操作的。
Consumer 配置
// 两次 poll 之间最大间隔,超过触发 rebalance
stringObjectMap.put("max.poll.interval.ms", "60000"); // 60秒
// session 超时时间,超过服务端认为消费者离线
stringObjectMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30秒
// 每次 poll 最大记录数
stringObjectMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // 10条项目配置文件中自定义参数配置
# 启动kafka消费者
kafka:
consumer:
auto-start: false # 是否自动启动消费者
#可通过配置文件覆盖的参数
kafka:
backoff:
interval: 1000 # 重试间隔(毫秒),默认1秒
max_failure: 3 # 最大重试次数,默认3次
consumer:
auto-start: true # 是否自动启动重试机制流程
场景 1:消费过程中抛异常
- 消费方法抛出异常
- DefaultErrorHandler 拦截异常
- 判断是否为可重试异常
- 如果可重试,等待 interval 时间后同步阻塞重试
- 最多重试 maxAttempts 次
- 重试成功或达到最大次数后记录日志
- 消息被视为处理完成,继续消费下一条
场景 2:不调用 ack.acknowledge()
不 ack 的后果:
- 当前消息 offset 不会提交 到 Kafka
- 服务重启后,会从上次提交的 offset 开始重新消费
- 但在运行期间不会自动重试(除非抛异常触发 ErrorHandler)
⬆️ 这里 ack 的作用,更多的还是 消息 offset 的下表是否 commit,跟重试关系不大。
场景 3:超过 max.poll.interval.ms
- 如果消费逻辑耗时超过 60 秒(默认 max.poll.interval.ms)
- Kafka 认为消费者已死,触发 Rebalance
- 消息会被分配给其他消费者实例
- 原消费者会收到 CommitFailedException
代码解释 🐕🦺
解释一段项目中项目配置类中使用到的代码
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
log.error("kafka消费失败, topic: {}, partition: {}, offset: {}, key: {}, value: {}",
consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value(), ExceptionUtils.getStackTrace(e));
}, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class, BeanCreationNotAllowedException.class, DataAccessResourceFailureException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class,GenericException.class);
return errorHandler;
}- 定义重试机制:
FixedBackOff- FixedBackOff:它是一个固定间隔的阻塞重试策略,重试的间隔时间和最大重试次数通过
interval和maxAttempts指定
- FixedBackOff:它是一个固定间隔的阻塞重试策略,重试的间隔时间和最大重试次数通过
- 配置 DefaultErrorHandler
- 配置可以重试与不可重试的异常
- addRetryableExceptions:设置哪些异常是可以进行重试的。如果这些异常在消费过程中抛出,Kafka 会按配置的重试机制进行处理。
- addNotRetryableExceptions:设置哪些异常是不可重试的。如果抛出这些异常,消息会被直接丢弃或写入死信队列,而不会重试。